大数据Spark生态系统 修仙之道 Spark Blog

2019-05-15 Docs Language:简体中文 & English Programing Spark Website:www.geekparkhub.com OpenSource GitHub repo size in bytes GeekDeveloper:JEEP-711 Github:github.com/geekparkhub Gitee:gitee.com/geekparkhub

🐘 Spark Technology 修仙之道 金仙道果 🐘

Alt text


🔥 1. Spark 基础 🔥

1.1 Spark 概述

1.1.1 Spark 模块

enter image description here

1.1.2 Spark 特点

1.1.3 Spark 应用场景

1.2 Spark 部署

解压spark-2.1.1-bin-hadoop2.7.tgz

[root@systemhub511 software]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

重命名spark-2.1.1-bin-hadoop2.7

[root@systemhub511 module]# mv spark-2.1.1-bin-hadoop2.7/ spark

1.3 Spark 运行模式

💥 1.3.1 Loacl Mode 💥

1.3.1.1 Loacl Mode 概述
1.3.1.2 (求π) & (WordCount) & (本地调试) 官方案例
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 3.059446 s
Pi is roughly 3.1411463141146316
[root@systemhub511 spark]# bin/spark-shell
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = local[*], app id = local-1558677071165).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

enter image description here

[root@systemhub511 spark]# mkdir -p input/wordcount
[root@systemhub511 spark]# cd input/wordcount/
[root@systemhub511 wordcount]# vim wordcount_001.txt
hadoop spark hive
hadoop spark hadoop
hbase flume hive
scala java oozie
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (spark,2), (hive,2), (hadoop,3), (oozie,1), (flume,1), (java,1), (hbase,1))
scala>
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("./output/wordcount/")
[root@systemhub511 spark]# cd output/wordcount/
[root@systemhub511 wordcount]# ll
total 4
-rw-r--r--. 1 root root 79 May 24 14:48 part-00000
-rw-r--r--. 1 root root 0 May 24 14:48 _SUCCESS
[root@systemhub511 wordcount]# cat part-00000
(scala,1)
(spark,2)
(hive,2)
(hadoop,3)
(oozie,1)
(flume,1)
(java,1)
(hbase,1)
[root@systemhub511 wordcount]#
1.3.1.3 提交流程

enter image description here

1.3.1.4 数据流程
参数列表 参数描述
textFile("input") 读取本地文件input文件夹数据
flatMap(_.split(" ")) 压平操作,按照空格分割符将一行数据映射成一个个单词
map((_,1)) 对每一个元素操作,将单词映射为元组
reduceByKey(_+_) 按照key将值进行聚合相加
collect 将数据收集到Driver端展示

enter image description here

💥 1.3.2 Standalone Mode 💥

1.3.2.1 Standalone Mode 概述

enter image description here

1.3.2.2 StandaloneMode QuickStart
[root@systemhub511 spark]# cd conf/
[root@systemhub511 conf]# mv slaves.template slaves
[root@systemhub511 conf]# mv spark-env.sh.template spark-env.sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
systemhub511
systemhub611
systemhub711
# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST=systemhub511
SPARK_MASTER_PORT=7077
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-systemhub511.out
systemhub711: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub711.out
systemhub611: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub611.out
systemhub511: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub511.out
[root@systemhub511 spark]#
[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
30651 org.apache.spark.deploy.worker.Worker
30443 org.apache.spark.deploy.master.Master
813 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
10369 org.apache.spark.deploy.worker.Worker
11777 sun.tools.jps.Jps
================ root@systemhub711 All Processes ===========
8960 org.apache.spark.deploy.worker.Worker
10364 sun.tools.jps.Jps
[root@systemhub511 spark]#
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://systemhub511:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://systemhub511:7077 \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 6.478381 s
Pi is roughly 3.1405883140588315
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = spark://systemhub511:7077, app id = app-20190524174512-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (hive,2), (oozie,1), (java,1), (spark,2), (hadoop,3), (flume,1), (hbase,1))
scala>

enter image description here

[root@systemhub511 conf]# mv spark-defaults.conf.template spark-defaults.conf
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
spark.eventLog.dir:Application在运行过程中所有信息均记录在该属性指定的路径下.
spark.history.ui.port=18080 WEBUI访问端口号为18080
spark.history.fs.logDirectory=hdfs://systemhub511:9000/directory 配置了该属性后,在start-history-server.sh时就无需再显示指定路径,Spark History Server只展示该指定路径下信息.
spark.history.retainedApplications=30 指定保存Application历史记录个数,如果超过这个值,旧应用程序信息将被删除,这个是内存中应用数,而不是页面上显示应用数.
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 hadoop]# sbin/start-dfs.sh
[root@systemhub511 spark]# hadoop fs -mkdir /directory
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-history-server.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

enter image description here

1.3.2.3 Spark HA 高可用

enter image description here

# SPARK_MASTER_HOST=systemhub511
# SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=systemhub511,systemhub611,systemhub711 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# /opt/module/hadoop/sbin/start-dfs.sh
[root@systemhub511 spark]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub611 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub711 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub611 ~]# /opt/module/spark/sbin/start-master.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077,systemhub611:7077

http://systemhub511:8080 | systemhub511节点状态为ALIVE
http://systemhub611:8080 | systemhub611节点状态为STANDBY

enter image description here

[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
32242 org.apache.hadoop.hdfs.server.namenode.NameNode
11206 org.apache.spark.deploy.master.Master
11368 org.apache.spark.deploy.worker.Worker
9705 org.apache.zookeeper.server.quorum.QuorumPeerMain
32444 org.apache.hadoop.hdfs.server.datanode.DataNode
5228 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
9157 org.apache.spark.deploy.master.Master
8901 org.apache.spark.deploy.worker.Worker
2822 sun.tools.jps.Jps
30214 org.apache.hadoop.hdfs.server.datanode.DataNode
7495 org.apache.zookeeper.server.quorum.QuorumPeerMain
================ root@systemhub711 All Processes ===========
5312 org.apache.spark.deploy.worker.Worker
31568 sun.tools.jps.Jps
26869 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
26647 org.apache.hadoop.hdfs.server.datanode.DataNode
4014 org.apache.zookeeper.server.quorum.QuorumPeerMain
[root@systemhub511 spark]#
[root@systemhub511 spark]# kill -9 11206

enter image description here

💥 1.3.3 Yarn Mode 💥

1.3.3.1 Yarn Mode 概述

enter image description here

1.3.3.2 YarnMode QuickStart
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
spark.yarn.historyServer.address=systemhub511:18080
spark.history.ui.port=18080
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
100

💥 1.3.4 Mesos Mode 💥

1.3.4.1 Mesos Mode 概述

💥 1.3.5 运行模式对比 💥

模式 集群数量 集群进程 所属者
Loacl Mode 1 Spark
Standalone Mode 3 Master & Worker Spark
Yarn Mode 1 Yarn & HDFS Hadoop

💥 1.3.6 WordCount 实例 💥

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark_server</artifactId>
<groupId>com.geekparkhub.core.spark</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-common</artifactId>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.geekparkhub.core.spark.application.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* WordCountApplication
* <p>
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**
* Create SparkConf
* 创建 SparkConf
*/
val sparkConf = new SparkConf().setMaster(args(0)).setAppName("WordCountApplication")
/**
* Create SparkContext
* 创建 SparkContext
*/
val sc = new SparkContext()
/**
* Read file
* 读取文件
*/
val line: RDD[String] = sc.textFile(args(1))
/**
* To flatten
* 压平
*/
val word: RDD[String] = line.flatMap(_.split(" "))
/**
* Word conversion dual group
* 单词转换二元组
*/
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
/**
* Count the total number of words
* 统计单词总数
*/
val wordCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* Write out the file
* 写出文件
*/
wordCount.saveAsTextFile(args(2))
/**
* Close resource
* 关闭资源
*/
sc.stop()
}
}
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/input/wordcount
hadoop fs -put /opt/module/spark/input/wordcount/wordcount_001.txt /core_flow/spark/input/wordcount
bin/spark-submit \
--class com.geekparkhub.core.spark.application.wordcount.WordCount \
--master yarn \
./lib_jar/WordCount.jar yarn \
/core_flow/spark/input/wordcount/wordcount_001.txt \
/core_flow/spark/output/wordcount
[root@systemhub511 spark]# hadoop fs -ls -R /core_flow/spark/output/wordcount/
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/_SUCCESS
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00000
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00001
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00000
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(scala,1)
(hive,2)
(oozie,1)
(java,1)
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00001
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(spark,2)
(hadoop,3)
(flume,1)
(hbase,1)
[root@systemhub511 spark]#

🔥 1.3 Spark Core 🔥

1.3.1 RDD 概述

1.3.1.1 什么是RDD

RDD (Resilient Distributed Dataset)弹性分布式数据集是Spark中最基本数据抽象,代码中是一个抽象类,它代表一个弹性/不可变/可分区/里面的元素可并行计算的集合.

1.3.1.2 RDD 属性
* Internally, each RDD is characterized by five main properties:
*
* - 1. A list of partitions
* - 2. A function for computing each split
* - 3. A list of dependencies on other RDDs
* - 4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - 5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

1.一组分区(Partition),即数据集基本组成单位;
2.一个计算每个分区的函数;
3.RDD之间依赖关系;
4.一个Partitioner,即RDD分片函数;
5.一个列表,存储存取每个Partition的优先位置(preferred location)

1.3.1.3 RDD 特点

RDD表示只读分区数据集,对RDD进行改动,只能通过RDD转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息,RDDs之间存在依赖,RDD执行是按照血缘关系延时计算,如果血缘关系较长,可以通过持久化RDD来切断血缘关系.

1.3.1.3.1 弹性
1.3.1.3.2 分区

RDD逻辑上是分区的,每个分区数据是抽象存在的,计算时会通过一个compute函数得到每个分区数据,如果RDD是通过已有文件系统构建,则compute函数是读取指定文件系统中数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD数据进行转换.

1.3.1.3.3 只读

RDD是只读的,要想改变RDD中数据,只能在现有RDD基础上创建新的RDD.

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce.

RDD操作算子包括两类,一类是transformations,它是用来将RDD进行转化,构建RDD的血缘关系,另一类是actions,它是用来触发RDD计算得到RDD相关计算结果或者将RDD保存文件系统中.

1.3.1.3.4 依赖

enter image description here

如图所示,RDDs通过操作算子进行转换,转换得到新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖.

依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多关系.

1.3.1.3.5 缓存

enter image description here

如果在应用程序中多次使用同一个RDD时,可以将该RDD缓存起来,该RDD只有在第一次计算时会根据血缘关系得到分区数据,在后续其他地方用到该RDD时,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用.

如图所示,RDD-1经过一系列转换后得到RDD-n并保存到HDFS,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0.

1.3.1.3.6 CheckPoint

虽然RDD血缘关系天然地可以实现容错,当RDD某个分区数据失败或丢失,可以通过血缘关系重建,但是对于长时间迭代型应用来说随着迭代进行,RDDs之间血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能.

为此,RDD支持checkpoint将数据保存到持久化存储中,这样就可以切断之前血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs,它可以从checkpoint处拿到数据.

1.3.2 RDD 编程

1.3.2.1 编程模型

在Spark中,RDD被表示为对象,通过对象方法调用RDD进行转换,经过一系列的transformations定义RDD之后,就可以调用actions触发RDD计算,action可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等).
在Spark中,只有遇到action才会执行RDD计算(即延迟计算),这样在运行时可以通过管道方式传输多个转换.
使用Spark开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD.并调用RDD上的action.Worker则执行RDD分区计算任务.

1.3.2.2 RDD 创建
1.3.2.1 集合创建RDD
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> val makerdd = sc.makeRDD(Array(511,611,711))
makerdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> makerdd.collect
res1: Array[Int] = Array(511, 611, 711)
scala>
1.3.2.2 外部存储系统数据集创建RDD
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt")
res2: org.apache.spark.rdd.RDD[String] = /opt/module/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[3] at textFile at <console>:25
scala>
1.3.2.3 从其他创建RDD
1.3.2.3 RDD 转换
1.3.2.3.1 Value 类型
1.3.2.3.1.1 map(func) Method
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> rdd.map((_,1)).collect
res4: Array[(Int, Int)] = Array((511,1), (611,1), (711,1))
scala>
scala> rdd.map((_*2)).collect
res5: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.2 mapPartitions(func) Method
scala> rdd.mapPartitions(_.map(_*2)).collect
res11: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.3 mapPartitionsWithIndex(func) Method
scala> rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))).collect
res13: Array[(Int, Int)] = Array((1,511), (2,611), (3,711))
scala>
1.3.2.3.1.4 flatMap(func) Method
scala> val text = sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
text: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> text.flatMap(_.split(" ")).collect
res16: Array[String] = Array(hadoop, spark, hive, hadoop, spark, hadoop, hbase, flume, hive, scala, java, oozie)
scala>
1.3.2.3.1.5 map()mapPartition()区别
1.3.2.3.1.6 glom Method
scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(), Array(511), Array(611), Array(711))
scala>
1.3.2.3.1.7 groupBy(func) Method
scala> rdd.groupBy(_ % 2).collect
res18: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(611, 711, 511)))
scala>
1.3.2.3.1.8 filter(func) Method
scala> rdd.filter(_%3==0).collect
res20: Array[Int] = Array(711)
scala>
1.3.2.3.1.9 sample(withReplacement,fraction,seed) Method
scala> val rdd = sc.parallelize(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> rdd.sample(false,0.1,3).collect
res22: Array[Int] = Array(1, 33, 37, 50, 59, 69, 75, 78, 85, 98)
scala>
1.3.2.3.1.10 distinct([numTasks])) Method
scala> rdd.distinct(4).collect
res23: Array[Int] = Array(84, 100, 96, 52, 56, 4, 76, 16, 28, 80, 48, 32, 36, 24, 64, 92, 40, 72, 8, 12, 20, 60, 44, 88, 68, 13, 41, 61, 81, 21, 77, 53, 97, 25, 29, 65, 73, 57, 93, 33, 37, 45, 1, 89, 17, 69, 9, 85, 49, 5, 34, 82, 66, 22, 54, 98, 46, 30, 14, 50, 62, 42, 74, 90, 6, 70, 18, 38, 86, 58, 78, 26, 94, 10, 2, 19, 39, 15, 47, 71, 55, 95, 79, 59, 11, 35, 27, 75, 51, 23, 63, 83, 67, 3, 7, 91, 31, 87, 43, 99)
scala>
1.3.2.3.1.11 coalesce(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[28] at coalesce at <console>:26
scala> coalesceRDD.partitions.size
res25: Int = 3
scala>
1.3.2.3.1.12 repartition(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.partitions.size
res26: Int = 4
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at repartition at <console>:26
scala> rerdd.partitions.size
res27: Int = 2
scala>
1.3.2.3.1.13 coalescerepartition区别

1.coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle: Boolean = false/true决定.

2.repartition实际上是调用coalesce,进行shuffle过程,源码演示:

def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] = withscope {
coalesce(numpartitions, shuffle = true)
}
1.3.2.3.1.14 sortBy(func,[ascending],[numTasks]) Method
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd.sortBy(x => x).collect()
res29: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x => x%3).collect()
res30: Array[Int] = Array(3, 1, 4, 2)
scala> rdd.sortBy(x => x,false).collect()
res31: Array[Int] = Array(4, 3, 2, 1)
scala>
1.3.2.3.1.15 pipe(command,[envVars]) Method
[root@systemhub511 ~]# vim /opt/module/spark/input/pipe.sh
[root@systemhub511 ~]# chmod 777 /opt/module/spark/input/pipe.sh
#!/bin/
shecho"Start"
while read LINE;do
echo ">>>" ${LINE}
done

🔒 尚未解锁 正在学习探索中… 尽情期待 Blog更新! 🔒

1.3.2.3.2 双Value类型交互
1.3.2.3.3 Key-Value 类型
1.3.2.4 Action

1.3.3 Key-Value RDD 数据分区

1.3.4 数据读取保存

1.3.5 RDD 编程进阶

🔥 1.4 Spark SQL 🔥

1.4.1 Spark SQL 概述

1.4.2 Spark SQL 查询

1.4.3 DataFrame

1.4.4 DataSet

1.4.5 聚合函数

1.4.6 Spark SQL 数据源

1.4.7 OLAP Server

1.4.8 Spark SQL 实例

🔥 1.5 Spark Streaming 🔥

1.5.1 Spark Streaming 概述

1.5.2 Spark Streaming Program

1.5.3 DataStream 概述

1.5.4 DataStream 输入

1.5.5 DataStream 转换

1.5.6 DataStream 输出

1.5.7 7*24hour运行

1.5.8 Spark Streaming 实例

🔥 2. Spark 高阶 🔥

2.1 内核机制

2.1 性能调优

3. 修仙之道 技术架构迭代 登峰造极之势

Alt text


💡如何对该开源文档进行贡献💡

  1. Blog内容大多是手敲,所以难免会有笔误,你可以帮我找错别字。

  2. 很多知识点我可能没有涉及到,所以你可以对其他知识点进行补充。

  3. 现有的知识点难免存在不完善或者错误,所以你可以对已有知识点的修改/补充。

  4. 💡欢迎贡献各领域开源野生Blog&笔记&文章&片段&分享&创想&OpenSource Project&Code&Code Review

  5. 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈 issues: geekparkhub.github.io/issues 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈

希望每一篇文章都能够对读者们提供帮助与提升,这乃是每一位笔者的初衷


💌感谢您的阅读 欢迎您的留言与建议💌

捐助 项目的发展离不开你的支持,请开发者喝杯☕Coffee☕吧!

enter image description here

致谢

捐助时请备注 UserName

ID UserName Donation Money Consume
1 Object WeChatPay 5RMB 一杯可乐
2 泰迪熊看月亮 AliPay 20RMB 一杯咖啡
3 修仙道长 WeChatPay 10RMB 两杯可乐

License 开源协议

Apache License Version 2.0